package com.shiku.imserver.service;

import com.alibaba.fastjson.JSONObject;
import com.shiku.imserver.CoreService;
import com.shiku.imserver.common.AbstractService;
import com.shiku.imserver.common.ImConfig;
import com.shiku.imserver.common.constant.KConstants;
import com.shiku.imserver.common.message.ChatMessage;
import com.shiku.utils.StringUtil;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocketmqService extends AbstractService {
  public static Logger logger = LoggerFactory.getLogger(RocketmqService.class);
  
  private ImConfig.MQConfig mqConfig;
  
  private DefaultMQProducer userStatusProducer;
  
  private DefaultMQProducer pushProducer;
  
  public boolean initialize() {
    this.mqConfig = IMBeanUtils.getImconfig().getMqConfig();
    getUserStatusProducer();
    getPushProducer();
    return false;
  }
  
  public DefaultMQProducer getUserStatusProducer() {
    if (null != this.userStatusProducer)
      return this.userStatusProducer; 
    synchronized (this.mqConfig) {
      try {
        System.out.println("userStatusProducer init  " + this.mqConfig.getNameAddr());
        this.userStatusProducer = new DefaultMQProducer("userStatusProducer");
        this.userStatusProducer.setNamesrvAddr(this.mqConfig.getNameAddr());
        this.userStatusProducer.setVipChannelEnabled(false);
        this.userStatusProducer.setCreateTopicKey("userStatusMessage");
        this.userStatusProducer.start();
        Thread.sleep(500L);
      } catch (Exception e) {
        System.err.println(e.getMessage());
      } 
    } 
    return this.userStatusProducer;
  }
  
  public DefaultMQProducer getPushProducer() {
    if (null != this.pushProducer)
      return this.pushProducer; 
    synchronized (this.mqConfig) {
      try {
        this.pushProducer = new DefaultMQProducer("pushProducer");
        this.pushProducer.setNamesrvAddr(this.mqConfig.getNameAddr());
        this.pushProducer.setVipChannelEnabled(false);
        this.pushProducer.start();
        Thread.sleep(500L);
      } catch (Exception e) {
        e.printStackTrace();
      } 
    } 
    return this.pushProducer;
  }
  
  public void restartProducer(DefaultMQProducer producer) {
    String producerGroup = producer.getProducerGroup();
    String namesrvAddr = producer.getNamesrvAddr();
    System.out.println(" restartProducer ===" + producerGroup);
    try {
      if (null != producer && null != producer.getDefaultMQProducerImpl()) {
        if (ServiceState.CREATE_JUST == producer.getDefaultMQProducerImpl().getServiceState())
          producer.start(); 
      } else {
        synchronized (producer) {
          producer = new DefaultMQProducer(producerGroup);
          producer.setNamesrvAddr(namesrvAddr);
          producer.setVipChannelEnabled(false);
          producer.start();
          Thread.sleep(500L);
        } 
      } 
    } catch (Exception e) {
      logger.error(e.getMessage(), e);
    } 
  }
  
  public void handleLogin(String connStr) {
    try {
      String resource = CoreService.parseResource(connStr);
      String userIdStr = CoreService.parseBareUserId(connStr);
      logger.info("handleLogin == userId ====> {} resource {}", userIdStr, resource);
      if (!IMBeanUtils.getImconfig().isPushUserStatus())
        return; 
      long userId = Long.valueOf(userIdStr).longValue();
      if (0L == userId) {
        logger.info("handleLogin ====> userId  is zero ");
        return;
      } 
      if (null == resource || "".equals(resource)) {
        logger.info("handleLogin ====> getResource  is null ");
        return;
      } 
      //String msg = userId + ":" + '\001' + ":" + resource;
      String msg = userId + ":" + "1" + ":" + resource;
      Message message = new Message("userStatusMessage", msg.getBytes("utf-8"));
      try {
        SendResult result = getUserStatusProducer().send(message);
        if (SendStatus.SEND_OK != result.getSendStatus())
          System.out.println(result.toString()); 
      } catch (Exception e) {
        logger.error(e.getMessage(), e);
        restartProducer(getUserStatusProducer());
      } 
    } catch (Exception e) {
      e.printStackTrace();
    } 
  }
  
  public void closeConnection(String connStr) {
    try {
      String resource = CoreService.parseResource(connStr);
      String userIdStr = CoreService.parseBareUserId(connStr);
      logger.info("closeConnection == userId ====> {} resource {}", userIdStr, resource);
      if (!IMBeanUtils.getImconfig().isPushUserStatus())
        return; 
      if (StringUtil.isEmpty(userIdStr) || "null".equals(userIdStr) || null == resource)
        return; 
      Message message = null;
      long userId = 0L;
      try {
        userId = Long.valueOf(userIdStr).longValue();
        if (0L == userId) {
          if (KConstants.isDebug)
            logger.info("closeConnection ====> userId  is zero "); 
          return;
        } 
        if (StringUtil.isEmpty(resource) || "null".equals(userIdStr)) {
          if (KConstants.isDebug)
            logger.info("closeConnection ====> getResource  is null "); 
          return;
        } 
      } catch (Exception e) {
        logger.error(e.getMessage(), e);
        return;
      } 
      try {
        String msg = userId + ":" + Character.MIN_VALUE + ":" + resource;
        message = new Message("userStatusMessage", msg.getBytes("utf-8"));
        SendResult result = getUserStatusProducer().send(message);
        if (SendStatus.SEND_OK != result.getSendStatus())
          System.out.println(result.toString()); 
      } catch (Exception e) {
        logger.error(e.getMessage(), e);
        restartProducer(getUserStatusProducer());
      } 
    } catch (Exception e) {
      e.printStackTrace();
    } 
  }
  
  public void offMessagePush(ChatMessage chatMessage) {
    int contextType = chatMessage.getType();
    if (26 == contextType || 27 == contextType || 200 == contextType || 201 == contextType)
      return; 
    JSONObject jsonBody = new JSONObject();
    jsonBody.put("type", Short.valueOf(chatMessage.getType()));
    jsonBody.put("content", chatMessage.getContent());
    jsonBody.put("fromUserId", chatMessage.getFromUserId());
    jsonBody.put("toUserId", chatMessage.getToUserId());
    jsonBody.put("fromUserName", chatMessage.getFromUserName());
    jsonBody.put("toUserName", chatMessage.getToUserName());
    jsonBody.put("objectId", chatMessage.getObjectId());
    jsonBody.put("fileName", chatMessage.getFileName());
    jsonBody.put("isReadDel", Integer.valueOf(chatMessage.isReadDel() ? 1 : 0));
    jsonBody.put("isEncrypt", Integer.valueOf(chatMessage.isEncrypt() ? 1 : 0));
    if (2 == chatMessage.getMessageHead().getChatType()) {
      jsonBody.put("roomJid", chatMessage.getMessageHead().getTo());
      jsonBody.put("isGroup", Boolean.valueOf(true));
    } else {
      jsonBody.put("isGroup", Boolean.valueOf(false));
    } 
    Message message = null;
    try {
      message = new Message("pushMessage", jsonBody.toJSONString().getBytes("utf-8"));
    } catch (Exception e) {
      logger.error(e.getMessage(), e);
      return;
    } 
    try {
      SendResult result = getPushProducer().send(message);
      if (SendStatus.SEND_OK != result.getSendStatus())
        System.out.println(result.toString()); 
    } catch (Exception e) {
      logger.error(e.getMessage(), e);
      restartProducer(getPushProducer());
    } 
  }
}
